<!DOCTYPE html>
<html lang="zh-cn">
<head>
  
    <link type="text/css" rel="stylesheet" href="/bundles/blog-common.css?v=KOZafwuaDasEedEenI5aTy8aXH0epbm6VUJ0v3vsT_Q1"/>
<link id="MainCss" type="text/css" rel="stylesheet" href="/skins/ThinkInside/bundle-ThinkInside.css?v=RRjf6pEarGnbXZ86qxNycPfQivwSKWRa4heYLB15rVE1"/>
<link type="text/css" rel="stylesheet" href="/blog/customcss/428549.css?v=%2fam3bBTkW5NBWhBE%2fD0lcyJv5UM%3d"/>

</head>
<body>
<a name="top"></a>

<div id="page_begin_html"></div><script>load_page_begin_html();</script>

<div id="topics">
	<div class = "post">
		<h1 class = "postTitle">
			<a id="cb_post_title_url" class="postTitle2" href="https://www.cnblogs.com/frankdeng/p/9310704.html">Kafka（三）Kafka的高可用与生产消费过程解析</a>
		</h1>
		<div class="clear"></div>
		<div class="postBody">
			<div id="cnblogs_post_body" class="blogpost-body"><h2>一&nbsp; Kafka HA设计解析</h2>
<h3>1.1 为何需要Replication</h3>
<p>　　在Kafka在0.8以前的版本中，是没有Replication的，一旦某一个Broker宕机，则其上所有的Partition数据都不可被消费，这与Kafka数据持久性及Delivery Guarantee的设计目标相悖。同时Producer都不能再将数据存于这些Partition中。</p>
<p>　　如果Producer使用同步模式则Producer会在尝试重新发送message.send.max.retries（默认值为3）次后抛出Exception，用户可以选择停止发送后续数据也可选择继续选择发送。而前者会造成数据的阻塞，后者会造成本应发往该Broker的数据的丢失。</p>
<p>　　如果Producer使用异步模式，则Producer会尝试重新发送message.send.max.retries（默认值为3）次后记录该异常并继续发送后续数据，这会造成数据丢失并且用户只能通过日志发现该问题。同时，Kafka的Producer并未对异步模式提供callback接口。</p>
<p>　　由此可见，在没有Replication的情况下，一旦某机器宕机或者某个Broker停止工作则会造成整个系统的可用性降低。随着集群规模的增加，整个集群中出现该类异常的几率大大增加，因此对于生产系统而言Replication机制的引入非常重要。</p>
<h3>1.2 Leader Election</h3>
<p>　　引入Replication之后，同一个Partition可能会有多个Replica，而这时需要在这些Replication之间选出一个Leader，Producer和Consumer只与这个Leader交互，其它Replica作为Follower从Leader中复制数据。</p>
<p>　　因为需要保证同一个Partition的多个Replica之间的数据一致性（其中一个宕机后其它Replica必须要能继续服务并且即不能造成数据重复也不能造成数据丢失）。如果没有一个Leader，所有Replica都可同时读/写数据，那就需要保证多个Replica之间互相（N&times;N条通路）同步数据，数据的一致性和有序性非常难保证，大大增加了Replication实现的复杂性，同时也增加了出现异常的几率。而引入Leader后，只有Leader负责数据读写，Follower只向Leader顺序Fetch数据（N条通路），系统更加简单且高效。</p>
<h3>1.3 如何将所有Replica均匀分布到整个集群</h3>
<p>为了更好的做负载均衡，Kafka尽量将所有的Partition均匀分配到整个集群上。一个典型的部署方式是一个Topic的Partition数量大于Broker的数量。同时为了提高Kafka的容错能力，也需要将同一个Partition的Replica尽量分散到不同的机器。实际上，如果所有的Replica都在同一个Broker上，那一旦该Broker宕机，该Partition的所有Replica都无法工作，也就达不到HA的效果。同时，如果某个Broker宕机了，需要保证它上面的负载可以被均匀的分配到其它幸存的所有Broker上。</p>
<p>Kafka分配Replica的算法如下：</p>
<p>1.将所有Broker（假设共n个Broker）和待分配的Partition排序</p>
<p>2.将第i个Partition分配到第（i mod n）个Broker上</p>
<p>3.将第i个Partition的第j个Replica分配到第（(i + j) mode n）个Broker上</p>
<h3>1.4 Data Replication(副本策略）</h3>
<p>Kafka的高可靠性的保障来源于其健壮的副本（replication）策略。</p>
<h4>1.4.1 消息传递同步策略</h4>
<p>Producer在发布消息到某个Partition时，先通过ZooKeeper找到该Partition的Leader，然后无论该Topic的Replication Factor为多少，Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。这种方式上，Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后，向Leader发送ACK。一旦Leader收到了ISR中的所有Replica的ACK，该消息就被认为已经commit了，Leader将增加HW并且向Producer发送ACK。</p>
<p>为了提高性能，每个Follower在接收到数据后就立马向Leader发送ACK，而非等到数据写入Log中。因此，对于已经commit的消息，Kafka只能保证它被存于多个Replica的内存中，而不能保证它们被持久化到磁盘中，也就不能完全保证异常发生后该条消息一定能被Consumer消费。</p>
<p>Consumer读消息也是从Leader读取，只有被commit过的消息才会暴露给Consumer。</p>
<p>Kafka Replication的数据流如下图所示：</p>
<p><img src="https://images2018.cnblogs.com/blog/1228818/201805/1228818-20180507194612622-1788087919.png" alt="" width="700" /></p>
<h4>1.4.2 ACK前需要保证有多少个备份</h4>
<p>对于Kafka而言，定义一个Broker是否&ldquo;活着&rdquo;包含两个条件：</p>
<ul>
<li>一是它必须维护与ZooKeeper的session（这个通过ZooKeeper的Heartbeat机制来实现）。</li>
<li>二是Follower必须能够及时将Leader的消息复制过来，不能&ldquo;落后太多&rdquo;。</li>
</ul>
<p>Leader会跟踪与其保持同步的Replica列表，该列表称为ISR（即in-sync Replica）。如果一个Follower宕机，或者落后太多，Leader将把它从ISR中移除。这里所描述的&ldquo;落后太多&rdquo;指Follower复制的消息落后于Leader后的条数超过预定值（该值可在$KAFKA_HOME/config/server.properties中通过replica.lag.max.messages配置，其默认值是4000）或者Follower超过一定时间（该值可在$KAFKA_HOME/config/server.properties中通过replica.lag.time.max.ms来配置，其默认值是10000）未向Leader发送fetch请求。</p>
<p>Kafka的复制机制既不是完全的同步复制，也不是单纯的异步复制。事实上，完全同步复制要求所有能工作的Follower都复制完，这条消息才会被认为commit，这种复制方式极大的影响了吞吐率（高吞吐率是Kafka非常重要的一个特性）。而异步复制方式下，Follower异步的从Leader复制数据，数据只要被Leader写入log就被认为已经commit，这种情况下如果Follower都复制完都落后于Leader，而如果Leader突然宕机，则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据，这样极大的提高复制性能（批量写磁盘），极大减少了Follower与Leader的差距。</p>
<p>需要说明的是，Kafka只解决fail/recover，不处理&ldquo;Byzantine&rdquo;（&ldquo;拜占庭&rdquo;）问题。一条消息只有被ISR里的所有Follower都从Leader复制过去才会被认为已提交。这样就避免了部分数据被写进了Leader，还没来得及被任何Follower复制就宕机了，而造成数据丢失（Consumer无法消费这些数据）。而对于Producer而言，它可以选择是否等待消息commit，这可以通过request.required.acks来设置。这种机制确保了只要ISR有一个或以上的Follower，一条被commit的消息就不会丢失。</p>
<h4>1.4.3 Leader Election算法</h4>
<p>Leader选举本质上是一个分布式锁，有两种方式实现基于ZooKeeper的分布式锁：</p>
<ul>
<li>节点名称唯一性：多个客户端创建一个节点，只有成功创建节点的客户端才能获得锁</li>
<li>临时顺序节点：所有客户端在某个目录下创建自己的临时顺序节点，只有序号最小的才获得锁</li>
</ul>
<p>一种非常常用的选举leader的方式是&ldquo;Majority Vote&rdquo;（&ldquo;少数服从多数&rdquo;），但Kafka并未采用这种方式。这种模式下，如果我们有2f+1个Replica（包含Leader和Follower），那在commit之前必须保证有f+1个Replica复制完消息，为了保证正确选出新的Leader，fail的Replica不能超过f个。因为在剩下的任意f+1个Replica里，至少有一个Replica包含有最新的所有消息。这种方式有个很大的优势，系统的latency只取决于最快的几个Broker，而非最慢那个。Majority Vote也有一些劣势，为了保证Leader Election的正常进行，它所能容忍的fail的follower个数比较少。如果要容忍1个follower挂掉，必须要有3个以上的Replica，如果要容忍2个Follower挂掉，必须要有5个以上的Replica。也就是说，在生产环境下为了保证较高的容错程度，必须要有大量的Replica，而大量的Replica又会在大数据量下导致性能的急剧下降。这就是这种算法更多用在ZooKeeper这种共享集群配置的系统中而很少在需要存储大量数据的系统中使用的原因。例如HDFS的HA Feature是基于majority-vote-based journal，但是它的数据存储并没有使用这种方式。</p>
<p>Kafka在ZooKeeper中动态维护了一个ISR（in-sync replicas），这个ISR里的所有Replica都跟上了leader，只有ISR里的成员才有被选为Leader的可能。在这种模式下，对于f+1个Replica，一个Partition能在保证不丢失已经commit的消息的前提下容忍f个Replica的失败。在大多数使用场景中，这种模式是非常有利的。事实上，为了容忍f个Replica的失败，Majority Vote和ISR在commit前需要等待的Replica数量是一样的，但是ISR需要的总的Replica的个数几乎是Majority Vote的一半。</p>
<p>虽然Majority Vote与ISR相比有不需等待最慢的Broker这一优势，但是Kafka作者认为Kafka可以通过Producer选择是否被commit阻塞来改善这一问题，并且节省下来的Replica和磁盘使得ISR模式仍然值得。</p>
<h4>1.4.4　如何处理所有Replica都不工作</h4>
<p>在ISR中至少有一个follower时，Kafka可以确保已经commit的数据不丢失，但如果某个Partition的所有Replica都宕机了，就无法保证数据不丢失了。这种情况下有两种可行的方案：</p>
<p>1.等待ISR中的任一个Replica&ldquo;活&rdquo;过来，并且选它作为Leader</p>
<p>2.选择第一个&ldquo;活&rdquo;过来的Replica（不一定是ISR中的）作为Leader</p>
<p>这就需要在可用性和一致性当中作出一个简单的折衷。如果一定要等待ISR中的Replica&ldquo;活&rdquo;过来，那不可用的时间就可能会相对较长。而且如果ISR中的所有Replica都无法&ldquo;活&rdquo;过来了，或者数据都丢失了，这个Partition将永远不可用。选择第一个&ldquo;活&rdquo;过来的Replica作为Leader，而这个Replica不是ISR中的Replica，那即使它并不保证已经包含了所有已commit的消息，它也会成为Leader而作为consumer的数据源（前文有说明，所有读写都由Leader完成）。Kafka0.8.*使用了第二种方式。根据Kafka的文档，在以后的版本中，Kafka支持用户通过配置选择这两种方式中的一种，从而根据不同的使用场景选择高可用性还是强一致性。</p>
<h4>1.4.5　选举Leader</h4>
<p>最简单最直观的方案是，所有Follower都在ZooKeeper上设置一个Watch，一旦Leader宕机，其对应的ephemeral znode会自动删除，此时所有Follower都尝试创建该节点，而创建成功者（ZooKeeper保证只有一个能创建成功）即是新的Leader，其它Replica即为Follower。</p>
<p>但是该方法会有3个问题：</p>
<p>1.split-brain 这是由ZooKeeper的特性引起的，虽然ZooKeeper能保证所有Watch按顺序触发，但并不能保证同一时刻所有Replica&ldquo;看&rdquo;到的状态是一样的，这就可能造成不同Replica的响应不一致</p>
<p>2.herd effect 如果宕机的那个Broker上的Partition比较多，会造成多个Watch被触发，造成集群内大量的调整</p>
<p>3.ZooKeeper负载过重 每个Replica都要为此在ZooKeeper上注册一个Watch，当集群规模增加到几千个Partition时ZooKeeper负载会过重。</p>
<p>Kafka 0.8.*的Leader Election方案解决了上述问题，它在所有broker中选出一个controller，所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式（比ZooKeeper Queue的方式更高效）通知需为为此作为响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。</p>
<h2>二&nbsp;<strong>Kafka生产过程分析</strong></h2>
<h3 id="blogTitle8"><strong>2</strong><strong>.1&nbsp;</strong><strong>写入方式</strong></h3>
<p>producer采用推（push）模式将消息发布到broker，每条消息都被追加（append）到分区（patition）中，属于顺序写磁盘（顺序写磁盘效率比随机写内存要高，保障kafka吞吐率）。</p>
<h3 id="blogTitle9"><strong>2.2&nbsp;</strong><strong>分区</strong><strong>（Partition）</strong></h3>
<p>Kafka集群有多个消息代理服务器（broker-server）组成，发布到Kafka集群的每条消息都有一个类别，用主题（topic）来表示。通常，不同应用产生不同类型的数据，可以设置不同的主题。一个主题一般会有多个消息的订阅者，当生产者发布消息到某个主题时，订阅了这个主题的消费者都可以接收到生成者写入的新消息。</p>
<p class="p">Kafka集群为每个主题维护了分布式的分区（partition）日志文件，物理意义上可以把主题（topic）看作进行了分区的日志文件（partition&nbsp;log）。主题的每个分区都是一个有序的、不可变的记录序列，新的消息会不断追加到日志中。分区中的每条消息都会按照时间顺序分配到一个单调递增的顺序编号，叫做偏移量（offset），这个偏移量能够唯一地定位当前分区中的每一条消息。</p>
<p class="p">消息发送时都被发送到一个topic，其本质就是一个目录，而topic是由一些Partition Logs(分区日志)组成，其组织结构如下图所示：</p>
<p class="p">下图中的topic有3个分区，每个分区的偏移量都从0开始，不同分区之间的偏移量都是独立的，不会相互影响。&nbsp;</p>
<p class="p"><img src="https://images2018.cnblogs.com/blog/1385722/201806/1385722-20180603183019597-557434132.png" alt="" /><img src="https://images2018.cnblogs.com/blog/1385722/201806/1385722-20180603183039669-990675999.png" alt="" /></p>
<p class="p">我们可以看到，每个Partition中的消息都是有序的，生产的消息被不断追加到Partition log上，其中的每一个消息都被赋予了一个唯一的offset值。</p>
<p class="p">发布到Kafka主题的每条消息包括键值和时间戳。消息到达服务器端的指定分区后，都会分配到一个自增的偏移量。原始的消息内容和分配的偏移量以及其他一些元数据信息最后都会存储到分区日志文件中。消息的键也可以不用设置，这种情况下消息会均衡地分布到不同的分区。</p>
<p class="p">1）&nbsp;分区的原因</p>
<p class="p">（1）方便在集群中扩展，每个Partition可以通过调整以适应它所在的机器，而一个topic又可以有多个Partition组成，因此整个集群就可以适应任意大小的数据了；</p>
<p class="p">（2）可以提高并发，因为可以以Partition为单位读写了。</p>
<p class="p">传统消息系统在服务端保持消息的顺序，如果有多个消费者消费同一个消息队列，服务端会以消费存储的顺序依次发送给消费者。但由于消息是异步发送给消费者的，消息到达消费者的顺序可能是无序的，这就意味着在并行消费时，传统消息系统无法很好地保证消息被顺序处理。虽然我们可以设置一个专用的消费者只消费一个队列，以此来解决消息顺序的问题，但是这就使得消费处理无法真正执行。</p>
<p class="p">Kafka比传统消息系统有更强的顺序性保证，它使用主题的分区作为消息处理的并行单元。Kafka以分区作为最小的粒度，将每个分区分配给消费者组中不同的而且是唯一的消费者，并确保一个分区只属于一个消费者，即这个消费者就是这个分区的唯一读取线程。那么，只要分区的消息是有序的，消费者处理的消息顺序就有保证。每个主题有多个分区，不同的消费者处理不同的分区，所以Kafka不仅保证了消息的有序性，也做到了消费者的负载均衡。</p>
<p>2）分区的原则</p>
<p>（1）指定了patition，则直接使用；</p>
<p>（2）未指定patition但指定key，通过对key的value进行hash出一个patition</p>
<p>（3）patition和key都未指定，使用轮询选出一个patition。</p>
<div class="cnblogs_code">
<pre><span style="color: #000000;">DefaultPartitioner类

</span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">int</span> partition(String topic, Object key, <span style="color: #0000ff;">byte</span>[] keyBytes, Object value, <span style="color: #0000ff;">byte</span><span style="color: #000000;">[] valueBytes, Cluster cluster) {
        List</span>&lt;PartitionInfo&gt; partitions =<span style="color: #000000;"> cluster.partitionsForTopic(topic);
        </span><span style="color: #0000ff;">int</span> numPartitions =<span style="color: #000000;"> partitions.size();
        </span><span style="color: #0000ff;">if</span> (keyBytes == <span style="color: #0000ff;">null</span><span style="color: #000000;">) {
            </span><span style="color: #0000ff;">int</span> nextValue =<span style="color: #000000;"> nextValue(topic);
            List</span>&lt;PartitionInfo&gt; availablePartitions =<span style="color: #000000;"> cluster.availablePartitionsForTopic(topic);
            </span><span style="color: #0000ff;">if</span> (availablePartitions.size() &gt; <span style="color: #800080;">0</span><span style="color: #000000;">) {
                </span><span style="color: #0000ff;">int</span> part = Utils.toPositive(nextValue) %<span style="color: #000000;"> availablePartitions.size();
                </span><span style="color: #0000ff;">return</span> availablePartitions.<span style="color: #0000ff;">get</span><span style="color: #000000;">(part).partition();
            } </span><span style="color: #0000ff;">else</span><span style="color: #000000;"> {
                </span><span style="color: #008000;">//</span><span style="color: #008000;"> no partitions are available, give a non-available partition</span>
                <span style="color: #0000ff;">return</span> Utils.toPositive(nextValue) %<span style="color: #000000;"> numPartitions;
            }
        } </span><span style="color: #0000ff;">else</span><span style="color: #000000;"> {
            </span><span style="color: #008000;">//</span><span style="color: #008000;"> hash the keyBytes to choose a partition</span>
            <span style="color: #0000ff;">return</span> Utils.toPositive(Utils.murmur2(keyBytes)) %<span style="color: #000000;"> numPartitions;
        }
    }</span></pre>
</div>
<h3><strong>2.3&nbsp;</strong><strong>副本</strong><strong>（Replication）</strong></h3>
<p id="blogTitle10">同一个partition可能会有多个replication（对应 server.properties 配置中的 default.replication.factor=N）。没有replication的情况下，一旦broker 宕机，其上所有 patition 的数据都不可被消费，同时producer也不能再将数据存于其上的patition。引入replication之后，同一个partition可能会有多个replication，而这时需要在这些replication之间选出一个leader，producer和consumer只与这个leader交互，其它replication作为follower从leader 中复制数据。</p>
<h3 id="blogTitle11"><strong>2.4&nbsp; 写入流程</strong></h3>
<p class="p">&nbsp;producer写入消息流程如下：</p>
<p class="p">&nbsp;<img src="https://images2018.cnblogs.com/blog/1385722/201806/1385722-20180603183156749-880301324.png" alt="" /></p>
<p>1）producer先从zookeeper的 "/brokers/.../state"节点找到该partition的leader</p>
<p>2）producer将消息发送给该leader</p>
<p>3）leader将消息写入本地log</p>
<p>4）followers从leader pull消息，写入本地log后向leader发送ACK</p>
<p>5）leader收到所有ISR中的replication的ACK后，增加HW（high watermark，最后commit 的offset）并向producer发送ACK</p>
<h2>三 broker保存消息</h2>
<h3>3.1 存储方式</h3>
<p>物理上把 topic 分成一个或多个 patition（对应 server.properties 中的 num.partitions=3 配置），每个 patition 物理上对应一个文件夹（该文件夹存储该 patition 的所有消息和索引文件），如下：</p>
<p><img src="https://images2018.cnblogs.com/blog/1228818/201805/1228818-20180507200226759-1617322728.png" alt="" /></p>
<h3>3.2 存储策略</h3>
<p>无论消息是否被消费，kafka 都会保留所有消息。有两种策略可以删除旧数据：</p>
<div class="cnblogs_code">
<pre>1、 基于时间：log.retention.hours=168 
2、 基于大小：log.retention.bytes=1073741824</pre>
</div>
<p>需要注意的是，因为Kafka读取特定消息的时间复杂度为O(1)，即与文件大小无关，所以这里删除过期文件与提高 Kafka 性能无关。</p>
<h3>3.3Zookeeper存储<span style="font-family: 宋体;">结构</span></h3>
<p><img src="https://images2018.cnblogs.com/blog/1228818/201805/1228818-20180507195223218-1719228508.png" alt="" width="1000" /></p>
<h4>admin</h4>
<p>该目录下znode只有在有相关操作时才会存在，操作结束时会将其删除</p>
<p>/admin/reassign_partitions用于将一些Partition分配到不同的broker集合上。对于每个待重新分配的Partition，Kafka会在该znode上存储其所有的Replica和相应的Broker id。该znode由管理进程创建并且一旦重新分配成功它将会被自动移除。</p>
<h4>broker</h4>
<p>即/brokers/ids/[brokerId]）存储&ldquo;活着&rdquo;的broker信息。</p>
<p>topic注册信息（/brokers/topics/[topic]），存储该topic的所有partition的所有replica所在的broker id，第一个replica即为preferred replica，对一个给定的partition，它在同一个broker上最多只有一个replica,因此broker id可作为replica id。</p>
<h4>controller</h4>
<p>/controller -&gt; int (broker id of the controller)存储当前controller的信息</p>
<p>/controller_epoch -&gt; int (epoch)直接以整数形式存储controller epoch，而非像其它znode一样以JSON字符串形式存储。</p>
<h2>四&nbsp;<strong>Kafka消费过程分析</strong></h2>
<p>kafka提供了两套consumer API：高级Consumer API和低级API。</p>
<h3 id="blogTitle17"><strong>4.1</strong><strong>消费模型</strong></h3>
<p>消息由生产者发布到Kafka集群后，会被消费者消费。消息的消费模型有两种：推送模型（push）和拉取模型（pull）。</p>
<p>基于推送模型（push）的消息系统，由消息代理记录消费者的消费状态。消息代理在将消息推送到消费者后，标记这条消息为已消费，但这种方式无法很好地保证消息被处理。比如，消息代理把消息发送出去后，当消费进程挂掉或者由于网络原因没有收到这条消息时，就有可能造成消息丢失（因为消息代理已经把这条消息标记为已消费了，但实际上这条消息并没有被实际处理）。如果要保证消息被处理，消息代理发送完消息后，要设置状态为&ldquo;已发送&rdquo;，只有收到消费者的确认请求后才更新为&ldquo;已消费&rdquo;，这就需要消息代理中记录所有的消费状态，这种做法显然是不可取的。</p>
<p>Kafka采用拉取模型，由消费者自己记录消费状态，每个消费者互相独立地顺序读取每个分区的消息。如下图所示，有两个消费者（不同消费者组）拉取同一个主题的消息，消费者A的消费进度是3，消费者B的消费进度是6。消费者拉取的最大上限通过最高水位（watermark）控制，生产者最新写入的消息如果还没有达到备份数量，对消费者是不可见的。这种由消费者控制偏移量的优点是：消费者可以按照任意的顺序消费消息。比如，消费者可以重置到旧的偏移量，重新处理之前已经消费过的消息；或者直接跳到最近的位置，从当前的时刻开始消费。</p>
<p><img src="https://images2018.cnblogs.com/blog/1385722/201806/1385722-20180603183239318-1642948549.png" alt="" /></p>
<p>在一些消息系统中，消息代理会在消息被消费之后立即删除消息。如果有不同类型的消费者订阅同一个主题，消息代理可能需要冗余地存储同一消息；或者等所有消费者都消费完才删除，这就需要消息代理跟踪每个消费者的消费状态，这种设计很大程度上限制了消息系统的整体吞吐量和处理延迟。Kafka的做法是生产者发布的所有消息会一致保存在Kafka集群中，不管消息有没有被消费。用户可以通过设置保留时间来清理过期的数据，比如，设置保留策略为两天。那么，在消息发布之后，它可以被不同的消费者消费，在两天之后，过期的消息就会自动清理掉。</p>
<h3 id="blogTitle18"><strong>4.2</strong><strong>高级</strong><strong>API</strong></h3>
<p>1）高级API优点</p>
<p>高级API 写起来简单</p>
<p>不需要自行去管理offset，系统通过zookeeper自行管理。</p>
<p>不需要管理分区，副本等情况，.系统自动管理。</p>
<p>消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据（默认设置1分钟更新一下zookeeper中存的offset）</p>
<p>可以使用group来区分对同一个topic 的不同程序访问分离开来（不同的group记录不同的offset，这样不同程序读取同一个topic才不会因为offset互相影响）</p>
<p>2）高级API缺点</p>
<p>不能自行控制offset（对于某些特殊需求来说）</p>
<p>不能细化控制如分区、副本、zk等</p>
<h3 id="blogTitle19"><strong>4.3</strong><strong>低级</strong><strong>API</strong></h3>
<p>1）低级&nbsp;API&nbsp;优点</p>
<p>能够让开发者自己控制offset，想从哪里读取就从哪里读取。</p>
<p>自行控制连接分区，对分区自定义进行负载均衡</p>
<p>对zookeeper的依赖性降低（如：offset不一定非要靠zk存储，自行存储offset即可，比如存在文件或者内存中）</p>
<p>2）低级API缺点</p>
<p>太过复杂，需要自行控制offset，连接哪个分区，找到分区leader 等。</p>
<h3 id="blogTitle20"><strong>4.4</strong><strong>消费者组</strong></h3>
<h3 id="blogTitle21"><strong><img src="https://images2018.cnblogs.com/blog/1385722/201806/1385722-20180603183316125-788771920.png" alt="" /></strong></h3>
<p>消费者是以consumer group消费者组的方式工作，由一个或者多个消费者组成一个组，共同消费一个topic。每个分区在同一时间只能由group中的一个消费者读取，但是多个group可以同时消费这个partition。在图中，有一个由三个消费者组成的group，有一个消费者读取主题中的两个分区，另外两个分别读取一个分区。某个消费者读取某个分区，也可以叫做某个消费者是某个分区的拥有者。</p>
<p>在这种情况下，消费者可以通过水平扩展的方式同时读取大量的消息。另外，如果一个消费者失败了，那么其他的group成员会自动负载均衡读取之前失败的消费者读取的分区。</p>
<h3 id="blogTitle22"><strong>4.5 消费方式</strong></h3>
<p>consumer采用pull（拉）模式从broker中读取数据。</p>
<p>push（推）模式很难适应消费速率不同的消费者，因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息，但是这样很容易造成consumer来不及处理消息，典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。</p>
<p>对于Kafka而言，pull模式更合适，它可简化broker的设计，consumer可自主控制消费消息的速率，同时consumer可以自己控制消费方式&mdash;&mdash;即可批量消费也可逐条消费，同时还能选择不同的提交方式从而实现不同的传输语义。</p>
<p>pull模式不足之处是，如果kafka没有数据，消费者可能会陷入循环中，一直等待数据到达。为了避免这种情况，我们在我们的拉请求中有参数，允许消费者请求在等待数据到达的&ldquo;长轮询&rdquo;中进行阻塞（并且可选地等待到给定的字节数，以确保大的传输大小）。</p>
<h3 id="blogTitle23"><strong>4.6</strong><strong>消费者</strong><strong>组案例</strong></h3>
<p>1）需求：测试同一个消费者组中的消费者，同一时刻只能有一个消费者消费。</p>
<p>2）案例实操</p>
<p>（1）在node21、node22上修改/opt/module/kafka/config/consumer.properties配置文件中的group.id属性为任意组名。</p>
<p>[root@node22 config]$ vi consumer.properties</p>
<p>group.id=admin</p>
<p>（2）在node21、node22上分别启动消费者</p>
<p>[root@node21 kafka]$ bin/kafka-console-consumer.sh --zookeeper&nbsp;node21:2181,node22:2181,node23:2181&nbsp; --topic first --consumer.config config/consumer.properties</p>
<p>[root@node22 kafka]$ bin/kafka-console-consumer.sh --zookeeper&nbsp;node21:2181,node22:2181,node23:2181&nbsp; --topic first --consumer.config config/consumer.properties</p>
<p>（3）在node23上启动生产者</p>
<p>[root@node23 kafka]$ bin/kafka-console-producer.sh --broker-list node21:9092 --topic first</p>
<p>&gt;hello world</p>
<p>（4）查看node21和node22的接收者。</p>
<p>同一时刻只有一个消费者接收到消息。</p>
<h2>五 Topic的创建和删除</h2>
<h3>5.1 创建topic</h3>
<p>创建 topic 的序列图如下所示：</p>
<p><img src="https://images2018.cnblogs.com/blog/1228818/201805/1228818-20180507200343317-1340406332.png" alt="" width="1000" /></p>
<p>流程说明：</p>
<blockquote>
<pre>1、 controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher，当 topic 被创建，则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
2、 controller从 /brokers/ids 读取当前所有可用的 broker 列表，对于 set_p 中的每一个 partition：
     2.1、 从分配给该 partition 的所有 replica（称为AR）中任选一个可用的 broker 作为新的 leader，并将AR设置为新的 ISR 
     2.2、 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state 
3、 controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。</pre>
</blockquote>
<h3>5.2 删除topic</h3>
<p>删除 topic 的序列图如下所示：</p>
<p><img src="https://images2018.cnblogs.com/blog/1228818/201805/1228818-20180507200533571-310409492.png" alt="" width="1000" /></p>
<p>流程说明：</p>
<blockquote>
<pre>1、 controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher，当 topic 被删除，则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。 
2、 若 delete.topic.enable=false，结束；否则 controller 注册在 /admin/delete_topics 上的 watch 被 fire，controller 通过回调向对应的 broker 发送 StopReplicaRequest。</pre>
</blockquote>
<h2>六&nbsp;<strong>broker failover</strong></h2>
<p>kafka broker failover 序列图如下所示：</p>
<p><strong><img src="https://images2018.cnblogs.com/blog/1228818/201805/1228818-20180507200729833-108400321.png" alt="" width="1000" /></strong></p>
<p>流程说明：</p>
<blockquote>
<pre>1、 controller 在 zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher，当 broker 宕机时 zookeeper 会 fire watch
2、 controller 从 /brokers/ids 节点读取可用broker 
3、 controller决定set_p，该集合包含宕机 broker 上的所有 partition 
4、 对 set_p 中的每一个 partition 
    4.1、 从/brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR 
    4.2、 决定新 leader 
    4.3、 将新 leader、ISR、controller_epoch 和 leader_epoch 等信息写入 state 节点
5、 通过 RPC 向相关 broker 发送 leaderAndISRRequest 命令</pre>
</blockquote>
<h2>七&nbsp;<strong>controller failover</strong></h2>
<p>当 controller 宕机时会触发 controller failover。每个 broker 都会在 zookeeper 的 "/controller" 节点注册 watcher，当 controller 宕机时 zookeeper 中的临时节点消失，所有存活的 broker 收到 fire 的通知，每个 broker 都尝试创建新的 controller path，只有一个竞选成功并当选为 controller。</p>
<p>当新的 controller 当选时，会触发 KafkaController.onControllerFailover 方法，在该方法中完成如下操作：</p>
<blockquote>
<pre>1、 读取并增加 Controller Epoch。 
2、 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。 
3、 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。 
4、 通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。 
5、 若 delete.topic.enable=true（默认值是 false），则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。 
6、 通过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。 
7、 初始化 ControllerContext 对象，设置当前所有 topic，&ldquo;活&rdquo;着的 broker 列表，所有 partition 的 leader 及 ISR等。 
8、 启动 replicaStateMachine 和 partitionStateMachine。 
9、 将 brokerState 状态设置为 RunningAsController。 
10、 将每个 partition 的 Leadership 信息发送给所有&ldquo;活&rdquo;着的 broker。 
11、 若 auto.leader.rebalance.enable=true（默认值是true），则启动 partition-rebalance 线程。 
12、 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值，则删除相应的Topic。</pre>
</blockquote></div>

</body>
</html>
